package com.goldmantis.activemq.ch3.queue;

import java.io.IOException;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveInfo;

public class AdvisoryMessage {

	private static String brokerURL = "tcp://localhost:51616";
	
	public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic connectionAdvisory = org.apache.activemq.advisory.AdvisorySupport.CONNECTION_ADVISORY_TOPIC;
        MessageConsumer consumer = session.createConsumer(connectionAdvisory);
        consumer.setMessageListener(new MessageListener(){
			@Override
			public void onMessage(Message message){
		        ActiveMQMessage msg = (ActiveMQMessage) message;
		        DataStructure data = (DataStructure) msg.getDataStructure();
		        if (data.getDataStructureType() == ConnectionInfo.DATA_STRUCTURE_TYPE) {
		            ConnectionInfo connectionInfo = (ConnectionInfo) data;
		            System.out.println("Connection started: " + connectionInfo);
		        } else if (data.getDataStructureType() == RemoveInfo.DATA_STRUCTURE_TYPE) {
		            RemoveInfo removeInfo = (RemoveInfo) data;
		            System.out.println("Connection stopped: " + removeInfo.getObjectId());
		        }else {
		            System.err.println("Unknown message " + data);
		        }
			}        	
        });
        
        listenToConsumerAdvisories();
	}
	
    public static void listenToConsumerAdvisories() throws Exception {
        // listen for Consumers
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
        Connection connection = connectionFactory.createConnection();
        connection.start();
        // Lets first create a Consumer to listen too
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // Lets first create a Consumer to listen too
        Queue queue = session.createQueue("JOBS.1");
        MessageConsumer testConsumer = session.createConsumer(queue);
        // so lets listen for the Consumer starting/stoping
        Topic advisoryTopic = org.apache.activemq.advisory.AdvisorySupport.getConsumerAdvisoryTopic(queue);
        org.apache.activemq.advisory.AdvisorySupport.getProducerAdvisoryTopic(queue);
        MessageConsumer consumer = session.createConsumer(advisoryTopic);
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message m) {
                ActiveMQMessage message = (ActiveMQMessage) m;
                try {
					System.out.println(message.getProperty("originBrokerId"));
				} catch (IOException e1) {
				}
                try {
                    System.out.println("Consumer Count = " + m.getStringProperty("consumerCount"));
                    DataStructure data = message.getDataStructure();
                    if (data.getDataStructureType() == ConsumerInfo.DATA_STRUCTURE_TYPE) {
                        ConsumerInfo consumerInfo = (ConsumerInfo) data;
                        System.out.println("Consumer started: " + consumerInfo);
                    } else if (data.getDataStructureType() == ProducerInfo.DATA_STRUCTURE_TYPE) {
                    	ProducerInfo producerInfo = (ProducerInfo) data;
                        System.out.println("producer info: " +producerInfo.getProducerId());
                    } else if (data.getDataStructureType() == DestinationInfo.DATA_STRUCTURE_TYPE) {
                    	DestinationInfo producerInfo = (DestinationInfo) data;
                        System.out.println("DestinationInfo: " +producerInfo.getConnectionId());
                    }else {
                        System.err.println("Unknown message " + data);
                    }
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        
        //producer
        advisoryTopic = org.apache.activemq.advisory.AdvisorySupport.getProducerAdvisoryTopic(queue);
        consumer = session.createConsumer(advisoryTopic);
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message m) {
                ActiveMQMessage message = (ActiveMQMessage) m;
                try {
					System.out.println(message.getProperty("originBrokerId"));
				} catch (IOException e1) {
					
				}
                try {
                    System.out.println("producer Count = " + m.getStringProperty("producerCount"));
                    DataStructure data = message.getDataStructure();
                    if (data.getDataStructureType() == ConsumerInfo.DATA_STRUCTURE_TYPE) {
                        ConsumerInfo consumerInfo = (ConsumerInfo) data;
                        System.out.println("Consumer started: " + consumerInfo);
                    } else if (data.getDataStructureType() == ProducerInfo.DATA_STRUCTURE_TYPE) {
                    	ProducerInfo producerInfo = (ProducerInfo) data;
                        System.out.println("producer info: " +producerInfo.getProducerId());
                    } else if (data.getDataStructureType() == DestinationInfo.DATA_STRUCTURE_TYPE) {
                    	DestinationInfo producerInfo = (DestinationInfo) data;
                        System.out.println("DestinationInfo: " +producerInfo.getConnectionId());
                    }else {
                        System.err.println("Unknown message " + data);
                    }
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });        
        //testConsumer.close();
    }	
}
